HDFS的Java API操作
一、HDFS客户端环境准备
1)根据自己电脑的操作系统拷贝对应的编译后的hadoop jar包到非中文路径
2)配置HADOOP_HOME环境变量和path路径
二、HDFS的API操作
新建Maven工程并添加依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
</dependencies>
API操作代码
1、HDFS文件上传
public void testPut() throws Exception {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(
new URI("hdfs://hadoop102:9000"),
configuration,
"drift");
fileSystem.copyFromLocalFile(
new Path("f:/hello.txt"), new Path("/0308_666/hello1.txt"));
fileSystem.close();
}
2、HDFS文件下载
public void testDownload() throws Exception {
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(
new URI("hdfs://hadoop102:9000"), configuration, "drift");
// 2 执行下载操作
fileSystem.copyToLocalFile(
false,
new Path("/0308_666/hello.txt"),
new Path("f:/hello1.txt"),
true);
// 3 关闭资源
fileSystem.close();
System.out.println("over");
}
3、HDFS文件夹删除
public void delete() throws Exception {
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(
new URI("hdfs://hadoop102:9000"), configuration, "drift");
// 2 执行删除操作
fileSystem.delete(new Path("/0308_777"), true);
// 3 关闭资源
fileSystem.close();
System.out.println("over");
}
4、HDFS文件夹名更改
public void testRename() throws Exception {
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(
new URI("hdfs://hadoop102:9000"), configuration, "drift");
// 2 执行重命名操作
fileSystem.rename(new Path("/0308_666/hello.txt"), new Path("/0308_666/hello2.txt"));
// 3 关闭资源
fileSystem.close();
System.out.println("over");
}
5、HDFS文件详情查看
public void testLS1() throws Exception {
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(
new URI("hdfs://hadoop102:9000"), configuration, "drift");
// 2 查询文件信息
RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
// 文件的长度
System.out.println(fileStatus.getLen());
// 文件的名字
System.out.println(fileStatus.getPath().getName());
// 文件的权限
System.out.println(fileStatus.getPermission());
BlockLocation[] locations = fileStatus.getBlockLocations();
for (BlockLocation location : locations) {
String[] hosts = location.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("---------------分割线---------------");
}
// 3 关闭资源
fileSystem.close();
}
6、HDFS文件和文件夹判断
public void testLS2() throws Exception {
// 1 获取文件系统
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(
new URI("hdfs://hadoop102:9000"), configuration, "drift");
// 2 文件和文件夹的判断
FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/"));
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isFile()) {
System.out.println("F:" + fileStatus.getPath().getName());
} else {
System.out.println("D:" + fileStatus.getPath().getName());
}
}
// 3 关闭资源
fileSystem.close();
}
HDFS的I/O流操作
1、HDFS文件上传
public void testPut2() throws Exception {
//1.获取hdfs的客户端
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(
new URI("hdfs://hadoop102:9000"), configuration, "drift");
//2.创建输入流
FileInputStream fileInputStream = new FileInputStream(new File("f:/hello2.txt"));
//3.创建输出流
FSDataOutputStream outputStream = fileSystem.create(new Path("/0308_666/hello3.txt"));
//4.流的拷贝
IOUtils.copyBytes(fileInputStream, outputStream, configuration);
//5.关闭资源
fileSystem.close();
}
2、HDFS文件下载
public void testDownload2() throws Exception {
//1.获取hdfs的客户端
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(
new URI("hdfs://hadoop102:9000"), configuration, "drift");
//2.创建输入流
FSDataInputStream inputStream = fileSystem.open(new Path("/0308_666/hello3.txt"));
//3.创建输出流
FileOutputStream outputStream = new FileOutputStream(new File("f:/hello3.txt"));
//4.流的拷贝
IOUtils.copyBytes(inputStream, outputStream, configuration);
//5.关闭资源
fileSystem.close();
System.out.println("over");
}
3、定位文件读取
需求:分块读取HDFS上的大文件
/**
* 文件的下载:
* 1.下载第一块
*/
public void testSeek1() throws Exception {
//1.获取hdfs的客户端
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(
new URI("hdfs://hadoop102:9000"), configuration, "drift");
//2.创建输入流
FSDataInputStream inputStream = fileSystem.open(new Path("/user/drift/hadoop-2.7.2.tar.gz"));
//3.创建输出流
FileOutputStream outputStream = new FileOutputStream(new File("f:/hadoop-2.7.2.tar.gz.part1"));
//4.流的拷贝
byte[] buf = new byte[1024];
for (int i = 0; i < 1024 * 128; i++) {
inputStream.read(buf);
outputStream.write(buf);
}
//5.关闭资源
IOUtils.closeStream(inputStream);
IOUtils.closeStream(outputStream);
fileSystem.close();
System.out.println("over");
}
/**
* 文件的下载:
* 2.下载第二块
*/
public void testSeek2() throws Exception {
//1.获取hdfs的客户端
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "drift");
//2.创建输入流
FSDataInputStream inputStream = fileSystem.open(new Path("/user/drift/hadoop-2.7.2.tar.gz"));
//3.创建输出流
FileOutputStream outputStream = new FileOutputStream(new File("f:/hadoop-2.7.2.tar.gz.part2"));
//4.流的拷贝
inputStream.seek(1024 * 1024 * 128);
IOUtils.copyBytes(inputStream, outputStream, configuration);
//5.关闭资源
IOUtils.closeStream(inputStream);
IOUtils.closeStream(outputStream);
fileSystem.close();
System.out.println("over");
}
}